package redis

import (
	"context"
	"fmt"
	"os"
	"sort"

	"git.blindage.org/21h/redis-operator/pkg/controller/manifests"
	rediscli "github.com/go-redis/redis"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/util/intstr"

	blindagev1alpha1 "git.blindage.org/21h/redis-operator/pkg/apis/blindage/v1alpha1"
	raven "github.com/getsentry/raven-go"
	v1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/api/policy/v1beta1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller"
	"sigs.k8s.io/controller-runtime/pkg/handler"
	"sigs.k8s.io/controller-runtime/pkg/manager"
	"sigs.k8s.io/controller-runtime/pkg/reconcile"
	logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
	"sigs.k8s.io/controller-runtime/pkg/source"
)

var log = logf.Log.WithName("controller_redis")

func init() {
	if os.Getenv("SENTRY_DSN") != "" {
		raven.SetDSN(os.Getenv("SENTRY_DSN"))
	}
}

func Add(mgr manager.Manager) error {
	return add(mgr, newReconciler(mgr))
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
	return &ReconcileRedis{client: mgr.GetClient(), scheme: mgr.GetScheme()}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
	// Create a new controller
	c, err := controller.New("redis-controller", mgr, controller.Options{Reconciler: r})
	if err != nil {
		return err
	}

	// Watch for changes to primary resource Redis
	err = c.Watch(&source.Kind{Type: &blindagev1alpha1.Redis{}}, &handler.EnqueueRequestForObject{})
	if err != nil {
		return err
	}

	// Watch for changes to secondary resource Pods and requeue the owner Redis
	err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
		IsController: true,
		OwnerType:    &blindagev1alpha1.Redis{},
	})
	if err != nil {
		return err
	}

	err = c.Watch(&source.Kind{Type: &v1.StatefulSet{}}, &handler.EnqueueRequestForOwner{
		IsController: true,
		OwnerType:    &blindagev1alpha1.Redis{},
	})
	if err != nil {
		return err
	}

	return nil
}

// blank assignment to verify that ReconcileRedis implements reconcile.Reconciler
var _ reconcile.Reconciler = &ReconcileRedis{}

// ReconcileRedis reconciles a Redis object
type ReconcileRedis struct {
	client client.Client
	scheme *runtime.Scheme
}

// Reconcile means magic begins
func (r *ReconcileRedis) Reconcile(request reconcile.Request) (reconcile.Result, error) {
	reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
	reqLogger.Info("Reconciling Redis")

	// Fetch the Redis instance
	instance := &blindagev1alpha1.Redis{}
	err := r.client.Get(context.TODO(), request.NamespacedName, instance)
	if err != nil {
		if errors.IsNotFound(err) {
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		return reconcile.Result{}, err
	}

	_, err = r.reconcileFinalizers(reqLogger, instance)
	if err != nil {
		raven.CaptureErrorAndWait(err, nil)
		return reconcile.Result{}, err
	}

	// Prepare Sentinel config
	configSentinelName := instance.Name + "-sentinel"
	configSentinelTemplate := `
sentinel monitor redismaster %v 6379 %v
sentinel down-after-milliseconds redismaster 1000
sentinel failover-timeout redismaster 3000
sentinel parallel-syncs redismaster 2
`
	configSentinelData := map[string]string{"sentinel.conf": fmt.Sprintf(configSentinelTemplate, instance.Name+"-redis", instance.Spec.Quorum)}

	if _, err := r.ReconcileConfigmap(reqLogger, instance, configSentinelName, configSentinelData); err != nil {
		return reconcile.Result{}, err
	}

	configRedisName := instance.Name + "-redis"
	configRedisData := map[string]string{"redis.conf": `
slaveof 127.0.0.1 6379
tcp-keepalive 60
save 900 1
save 300 10
`}

	if _, err := r.ReconcileConfigmap(reqLogger, instance, configRedisName, configRedisData); err != nil {
		return reconcile.Result{}, err
	}

	configFailoverName := instance.Name + "-failover"
	configFailoverData := map[string]string{"failover.sh": `
MASTER_HOST=$(redis-cli -h ${SENTINEL_SERVICE} -p 26379 --csv SENTINEL get-master-addr-by-name redismaster | tr ',' ' ' | tr -d '\"' |cut -d' ' -f1)
if [[ ${MASTER_HOST} == $(hostname -i) ]]; then
  redis-cli -h ${SENTINEL_SERVICE} -p 26379 SENTINEL failover redismaster
fi
`}

	if _, err := r.ReconcileConfigmap(reqLogger, instance, configFailoverName, configFailoverData); err != nil {
		return reconcile.Result{}, err
	}

	// reconcile Sentinel deployment
	newSentinelDeployment := manifests.GenerateDeployment(instance)
	if _, err := r.ReconcileDeployment(reqLogger, instance, newSentinelDeployment); err != nil {
		return reconcile.Result{}, err
	}

	// reconcile Redis StatefulSet
	newRedisStatefulset := manifests.GenerateStatefulSet(instance)
	if _, err := r.ReconcileStatefulSet(reqLogger, instance, newRedisStatefulset); err != nil {
		return reconcile.Result{}, err
	}

	// create sentinel and redis services
	serviceName := instance.Name + "-sentinel"
	servicePortName := "sentinel"
	servicePort := int32(26379)
	serviceSelector := map[string]string{"component": "sentinel"}
	if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort}, serviceSelector); err != nil {
		return reconcile.Result{}, err
	}

	serviceName = instance.Name + "-redis"
	servicePortName = "redis"
	servicePort = int32(6379)
	serviceSelector = map[string]string{"component": "redis"}
	if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort}, serviceSelector); err != nil {
		return reconcile.Result{}, err
	}

	// create PDB resources
	if instance.Spec.PdbRedis != nil {
		pdbName := instance.Name + "-redis"
		pdbSpec := v1beta1.PodDisruptionBudgetSpec{
			Selector: &metav1.LabelSelector{
				MatchLabels: manifests.MergeLabels(manifests.BaseLabels(instance), map[string]string{"component": "redis"}),
			},
		}

		if instance.Spec.PdbRedis.MaxUnavailable != nil {
			pdbSpec.MaxUnavailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbRedis.MaxUnavailable}
		}
		if instance.Spec.PdbRedis.MinAvailable != nil {
			pdbSpec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbRedis.MinAvailable}
		}

		if _, err := r.ReconcilePodDisruptionBudget(reqLogger, instance, pdbName, pdbSpec); err != nil {
			return reconcile.Result{}, err
		}
	} else {
		pdb := v1beta1.PodDisruptionBudget{
			ObjectMeta: metav1.ObjectMeta{
				Name:      instance.Name + "-redis",
				Namespace: instance.Namespace,
			},
			Spec: v1beta1.PodDisruptionBudgetSpec{},
		}

		err := r.client.Delete(context.TODO(), &pdb)
		if err != nil && !errors.IsNotFound(err) {
			raven.CaptureErrorAndWait(err, nil)
			reqLogger.Info("PodDisruptionBudget deletion error", "Namespace", pdb.Namespace, "Name", pdb.Name, "Error", err)
			return reconcile.Result{}, err
		}
	}

	if instance.Spec.PdbSentinel != nil {
		pdbName := instance.Name + "-sentinel"
		pdbSpec := v1beta1.PodDisruptionBudgetSpec{
			Selector: &metav1.LabelSelector{
				MatchLabels: manifests.MergeLabels(manifests.BaseLabels(instance), map[string]string{"component": "sentinel"}),
			},
		}
		if instance.Spec.PdbSentinel.MaxUnavailable != nil {
			// adorable if MaxUnavailable < (SentinelReplicas/2), just to save quorum
			if *instance.Spec.PdbSentinel.MaxUnavailable > (*instance.Spec.SentinelReplicas / 2) {
				reqLogger.Error(err, "Sentinel MaxUnavailable must be lesser then sentinelReplicas/2 to save quorum", "Namespace", instance.Namespace, "Name", instance.Name)
				return reconcile.Result{}, err
			}
			pdbSpec.MaxUnavailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbSentinel.MaxUnavailable}
		}
		if instance.Spec.PdbSentinel.MinAvailable != nil {
			// adorable if MinAvailable > (SentinelReplicas/2), just to save quorum
			if *instance.Spec.PdbSentinel.MinAvailable < (*instance.Spec.SentinelReplicas / 2) {
				reqLogger.Error(err, "Sentinel MinAvailable must be greater then sentinelReplicas/2 to save quorum", "Namespace", instance.Namespace, "Name", instance.Name)
				return reconcile.Result{}, err
			}
			pdbSpec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbSentinel.MinAvailable}
		}

		if _, err := r.ReconcilePodDisruptionBudget(reqLogger, instance, pdbName, pdbSpec); err != nil {
			return reconcile.Result{}, err
		}
	} else {
		pdb := v1beta1.PodDisruptionBudget{
			ObjectMeta: metav1.ObjectMeta{
				Name:      instance.Name + "-sentinel",
				Namespace: instance.Namespace,
			},
			Spec: v1beta1.PodDisruptionBudgetSpec{},
		}

		err := r.client.Delete(context.TODO(), &pdb)
		if err != nil && !errors.IsNotFound(err) {
			raven.CaptureErrorAndWait(err, nil)
			reqLogger.Info("PodDisruptionBudget deletion error", "Namespace", pdb.Namespace, "Name", pdb.Name, "Error", err)
			return reconcile.Result{}, err
		}

	}

	if instance.Spec.PdbHaproxy != nil {
		pdbName := instance.Name + "-haproxy"
		pdbSpec := v1beta1.PodDisruptionBudgetSpec{
			Selector: &metav1.LabelSelector{
				MatchLabels: manifests.MergeLabels(manifests.BaseLabels(instance), map[string]string{"component": "haproxy"}),
			},
		}
		if instance.Spec.PdbHaproxy.MaxUnavailable != nil {
			pdbSpec.MaxUnavailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbHaproxy.MaxUnavailable}
		}
		if instance.Spec.PdbHaproxy.MinAvailable != nil {
			pdbSpec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *instance.Spec.PdbHaproxy.MinAvailable}
		}

		if _, err := r.ReconcilePodDisruptionBudget(reqLogger, instance, pdbName, pdbSpec); err != nil {
			return reconcile.Result{}, err
		}
	} else {
		pdb := v1beta1.PodDisruptionBudget{
			ObjectMeta: metav1.ObjectMeta{
				Name:      instance.Name + "-haproxy",
				Namespace: instance.Namespace,
			},
			Spec: v1beta1.PodDisruptionBudgetSpec{},
		}

		err := r.client.Delete(context.TODO(), &pdb)
		if err != nil && !errors.IsNotFound(err) {
			raven.CaptureErrorAndWait(err, nil)
			reqLogger.Info("PodDisruptionBudget deletion error", "Namespace", pdb.Namespace, "Name", pdb.Name, "Error", err)
			return reconcile.Result{}, err
		}

	}

	// set Redis master

	podList := &corev1.PodList{}
	labelSelector := labels.SelectorFromSet(newRedisStatefulset.Labels)
	listOpts := &client.ListOptions{
		Namespace:     newRedisStatefulset.Namespace,
		LabelSelector: labelSelector,
	}
	err = r.client.List(context.TODO(), listOpts, podList)
	if err != nil {
		reqLogger.Error(err, "Failed to list Pods.", "Namespace", instance.Namespace, "Name", instance.Name)
		return reconcile.Result{}, err
	}

	if len(podList.Items) < 1 {
		reqLogger.Error(err, "Pods < 0", "Namespace", instance.Namespace, "Name", instance.Name)
		return reconcile.Result{}, err
	}

	// Order the pods so we start by the oldest one
	sort.Slice(podList.Items, func(i, j int) bool {
		return podList.Items[i].CreationTimestamp.Before(&podList.Items[j].CreationTimestamp)
	})

	newMasterIP := ""
	podIPs := []string{}

	for _, pod := range podList.Items {
		// pod will be deleted, skip
		if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
			continue
		}

		if pod.Status.Phase == corev1.PodPending || pod.Status.Phase == corev1.PodRunning {
			// for haproxy if enabled
			podIPs = append(podIPs, pod.Status.PodIP)

			if newMasterIP == "" {
				newMasterIP = pod.Status.PodIP
				reqLogger.Info("New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
				if err := querySetMaster(newMasterIP); err != nil {
					reqLogger.Error(err, "Error! New master ip", newMasterIP, instance.Namespace, "Name", instance.Name)
					return reconcile.Result{}, err
				}
			} else {
				reqLogger.Info("Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
				if err := querySetSlaveOf(pod.Status.PodIP, newMasterIP); err != nil {
					reqLogger.Error(err, "Error! Redis", pod.Name, "slaveof", newMasterIP, instance.Namespace, "Name", instance.Name)
					return reconcile.Result{}, err
				}
			}
		}
	}

	// haproxy

	// check if you need haproxy
	if instance.Spec.UseHAProxy {

		configHaproxyShepherdName := instance.Name + "-haproxy-shepherd"
		configHaproxyShepherdData := map[string]string{"shepherd.sh": `
#!/bin/sh
echo "Start"
MONFILE='/usr/local/etc/haproxy/haproxy.cfg'
PIDFILE='/run/haproxy.pid'
MD5FILE='/tmp/haproxy.cfg.md5'

touch ${MD5FILE}

while true
do
	MD5LAST="$(cat ${MD5FILE})"
	echo "Read MD5 of ${MD5FILE}: ${MD5LAST}"
	if [ -z "${MD5LAST}" ]
	then
		echo "First time check, md5 file is empty"
		echo "$(md5sum ${MONFILE})" > ${MD5FILE}
	else
		echo "Get md5 and compare with last time"
		MD5CURRENT="$(md5sum ${MONFILE})"
		if [ "${MD5CURRENT}" != "${MD5LAST}" ]
		then
			echo "Send signal to haproxy"
			kill -HUP $(cat ${PIDFILE})
			echo "${MD5CURRENT}" > ${MD5FILE}
		fi
	fi
	# sleep 5 seconds, it will be enough to not disturb haproxy while pods rapidly creates or dies 
	sleep 5
done
`}

		if _, err := r.ReconcileConfigmap(reqLogger, instance, configHaproxyShepherdName, configHaproxyShepherdData); err != nil {
			return reconcile.Result{}, err
		}

		redisEndpointTemplate := "  server redis_backend_%v %v:6379 maxconn 1024 check inter %vs\n"
		redisEndpoints := ""

		haproxyBackendCheckInterval := 1
		if instance.Spec.HAProxyBackendCheckInterval > 0 {
			haproxyBackendCheckInterval = instance.Spec.HAProxyBackendCheckInterval
		}

		for num, ip := range podIPs {
			redisEndpoints = redisEndpoints + fmt.Sprintf(redisEndpointTemplate, num, ip, haproxyBackendCheckInterval)
		}

		configHaproxyConfigName := instance.Name + "-haproxy"
		configHaproxyConfigData := map[string]string{"haproxy.cfg": `
global
	pidfile /run/haproxy.pid
defaults
	mode tcp
	timeout connect 5s
	timeout server %vs
	timeout client %vs
	option  tcpka
listen stats
	mode http
	bind :9000
	stats enable
	stats hide-version
	stats realm Haproxy\ Statistics
	stats uri /haproxy_stats
frontend ft_redis
	mode tcp
	bind *:6379
	default_backend bk_redis
backend bk_redis
	mode tcp
	option tcp-check
	tcp-check send PING\r\n
	tcp-check expect string +PONG
	tcp-check send info\ replication\r\n
	tcp-check expect string role:master
	tcp-check send QUIT\r\n
	tcp-check expect string +OK
`}

		haproxyTimeoutServer := 30
		if instance.Spec.HAProxyTimeoutServer > 0 {
			haproxyTimeoutServer = instance.Spec.HAProxyTimeoutServer
		}

		haproxyTimeoutClient := 30
		if instance.Spec.HAProxyTimeoutClient > 0 {
			haproxyTimeoutClient = instance.Spec.HAProxyTimeoutClient
		}

		configHaproxyConfigData["haproxy.cfg"] = fmt.Sprintf(configHaproxyConfigData["haproxy.cfg"], haproxyTimeoutServer, haproxyTimeoutClient) + redisEndpoints

		if _, err := r.ReconcileConfigmap(reqLogger, instance, configHaproxyConfigName, configHaproxyConfigData); err != nil {
			return reconcile.Result{}, err
		}

		// reconcile HAProxy deployment
		newHAProxyDeployment := manifests.GenerateHaproxyDeployment(instance)
		if _, err := r.ReconcileDeployment(reqLogger, instance, newHAProxyDeployment); err != nil {
			return reconcile.Result{}, err
		}

		// create haproxy service
		serviceName = instance.Name + "-haproxy"
		servicePortName = "haproxy"
		servicePort = int32(6379)
		serviceSelector = map[string]string{"component": "haproxy"}
		if _, err := r.ReconcileService(reqLogger, instance, serviceName, map[string]int32{servicePortName: servicePort, "stats": 9000}, serviceSelector); err != nil {
			return reconcile.Result{}, err
		}

	}

	reqLogger.Info("Reconcile complete", "Namespace", instance.Namespace, "Name", instance.Name)
	return reconcile.Result{}, nil
}

func querySetMaster(ip string) error {
	options := &rediscli.Options{
		Addr:     fmt.Sprintf("%s:%s", ip, "6379"),
		Password: "",
		DB:       0,
	}
	rClient := rediscli.NewClient(options)
	defer rClient.Close()
	if res := rClient.SlaveOf("NO", "ONE"); res.Err() != nil {
		return res.Err()
	}
	return nil
}

func querySetSlaveOf(ip string, masterIP string) error {
	options := &rediscli.Options{
		Addr:     fmt.Sprintf("%s:%s", ip, "6379"),
		Password: "",
		DB:       0,
	}
	rClient := rediscli.NewClient(options)
	defer rClient.Close()
	if res := rClient.SlaveOf(masterIP, "6379"); res.Err() != nil {
		return res.Err()
	}
	return nil
}
